Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6830] Allow indicate advisory shuffle partition size when me… #6831

Closed
wants to merge 1 commit into from

Conversation

yabola
Copy link
Contributor

@yabola yabola commented Dec 2, 2024

Why are the changes needed?

when merging small files(set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true) , the default session advisory partition size (64MB) will be used as target. This default value can still lead to small files because the written data can be compressed nicely using columnar file formats (usually 1/4 or smaller of the shuffle exchange size, the result is often around 15MB).

Spark now support configuring the rebalance expression advisory size in apache/spark#40421 . So we can have a configuration that can configure the merge size separately.

Was this patch authored or co-authored using generative AI tooling?

no

@pan3793
Copy link
Member

pan3793 commented Dec 2, 2024

the use case is already covered by

spark.sql.optimizer.finalStageConfigIsolation.enabled=true
spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes=512m

https://kyuubi.readthedocs.io/en/master/extensions/engines/spark/rules.html#additional-configurations

@codecov-commenter
Copy link

codecov-commenter commented Dec 2, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 0.00%. Comparing base (c391d16) to head (345b58e).
Report is 19 commits behind head on master.

Additional details and impacted files
@@          Coverage Diff           @@
##           master   #6831   +/-   ##
======================================
  Coverage    0.00%   0.00%           
======================================
  Files         687     687           
  Lines       42442   42439    -3     
  Branches     5793    5792    -1     
======================================
+ Misses      42442   42439    -3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@yabola
Copy link
Contributor Author

yabola commented Dec 2, 2024

@pan3793 Yes, I hadn't noticed before.
Another question is, can we add compression ratios to control different advisoryPartitionSizeInBytes for different file format (parquet, orc,avro, text, etc.) . It can make it more automated. Iceberg has similar functionality. If you think it's okay, I can improve it.

@pan3793
Copy link
Member

pan3793 commented Dec 4, 2024

@yabola the real written size is affected by several things, e.g. the input data (might be from shuffle, or other data sources directly) compression codec and level, the data itself, the written format as you mentioned, the write compression codec and level, I don't think we can estimate that automatically and correctly

@yabola
Copy link
Contributor Author

yabola commented Dec 6, 2024

@pan3793 emmm, but in the scenario of merging small files, we only need to consider the shuffle data size (this rule is only for shuffle data to file, doesn't matter what the data source is).
Due to the row storage and estimation method of shuffle data, there is still a significant difference between the shuffle size and the actual written file size, especially for Parquet , usually less than 1/3 of the size of the shuffle data.
iceberg Implementation:
https://github.com/apache/iceberg/blob/38c8daa4eae8a75ab46571f1efce1609100f53dd/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCompressionUtil.java#L60-L69

@pan3793
Copy link
Member

pan3793 commented Dec 6, 2024

in the scenario of merging small files, we only need to consider the shuffle data size (this rule is only for shuffle data to file, doesn't matter what the data source is).

I overlook this, you are right.

I read the the Iceberg's code and understand the how it works, I am a little bit pessimistic to adopt it, because the real compression ratio is affected by data itself, the experience-based assumption of compression ratio is not always true, when the estimation deviates significantly from the real value, it's hard to explain to users how this happen. Files written by a Spark job are likely read by other Spark jobs, and the data will be covnerted to Spark InternelRow layout(same as Shuffle) again, have the compression ratio been considered on the read code path too?

Instead of setting the advisoryPartitionSizeInBytes automatically, I think it would be nice if we add a section in docs to explain how advisoryPartitionSizeInBytes affects the written file size, with some tune guidance.

DISCLAIMER: I'm not fully against adding the proposed feature if other people think it's a good idea, especially if they can provide some cases of actual benefits, as long as the feature is disabled by default.

@yabola
Copy link
Contributor Author

yabola commented Dec 11, 2024

@pan3793 I will close this, if anyone finds it helpful, can revisit this ~
In addition, we may consider the situation to be simpler, because the recommended size of parquet (or other format) is generally 128MB, and the files written are almost certainly smaller than the shuffle size, which is enough, because this is a general rule (but now the format difference is not taken into account, so it may not be general enough).

@yabola yabola closed this Dec 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants